分别对应DubboCommunicationConnection与RmiCommunicationConnection,这两个都实现CommunicationConnection
public interface CommunicationConnection {
public Object call(Event event);
public CommunicationParam getParams();
public void close() throws CommunicationException;
}
其创建和关闭都在中CommunicationConnectionFactory管理
public interface CommunicationConnectionFactory {
CommunicationConnection createConnection(CommunicationParam params);
void releaseConnection(CommunicationConnection connection);
}
DubboCommunicationConnectionFactory中能看到dubbo的相关代码
private final String DUBBO_SERVICE_URL = "dubbo://{0}:{1}/endpoint?client=netty&codec=dubbo&serialization=java&lazy=true&iothreads=4&threads=50&connections=30&acceptEvent.timeout=50000&payload={2}";
private DubboProtocol protocol = DubboProtocol.getDubboProtocol();
private ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class)
.getExtension("javassist");
public DubboCommunicationConnectionFactory(){
connections = OtterMigrateMap.makeComputingMap(new Function<String, CommunicationEndpoint>() {
public CommunicationEndpoint apply(String serviceUrl) {
return proxyFactory.getProxy(protocol.refer(CommunicationEndpoint.class, URL.valueOf(serviceUrl)));
}
});
}
public CommunicationConnection createConnection(CommunicationParam params) {
if (params == null) {
throw new IllegalArgumentException("param is null!");
}
// 构造对应的url, String.valueOf() 为避免数字包含千位符
String serviceUrl = MessageFormat.format(DUBBO_SERVICE_URL, params.getIp(), String.valueOf(params.getPort()), String.valueOf(payload));
CommunicationEndpoint endpoint = connections.get(serviceUrl);
return new DubboCommunicationConnection(params, endpoint);
}
服务注册与action绑定
//ConfigRemoteServiceImpl.java
public ConfigRemoteServiceImpl(){
// 注册一下事件处理
CommunicationRegistry.regist(ConfigEventType.findChannel, this);
CommunicationRegistry.regist(ConfigEventType.findNode, this);
CommunicationRegistry.regist(ConfigEventType.findTask, this);
CommunicationRegistry.regist(ConfigEventType.findMedia, this);
}
//CanalRemoteServiceImpl.java
public CanalRemoteServiceImpl(){
CommunicationRegistry.regist(CanalEventType.findCanal, this);
CommunicationRegistry.regist(CanalEventType.findFilter, this);
}
CommunicationClient是客户端的接口,默认实现DefaultCommunicationClientImpl,DefaultCommunicationClientImpl内部才是调用的是CommunicationConnection的
canal服务为例跟踪下调用流程 客户端调用:
Canal canal = canalConfigClient.findCanal(destination);
canalConfigClient中的findCanal()
//canalConfigClient.java
public Canal findCanal(String destination) {
FindCanalEvent event = new FindCanalEvent();
event.setDestination(destination);
try {
Object obj = delegate.callManager(event);
if (obj != null && obj instanceof Canal) {
return (Canal) obj;
} else {
throw new CanalException("No Such Canal by [" + destination + "]");
}
} catch (Exception e) {
throw new CanalException("call_manager_error", e);
}
CanalCommmunicationClient
//CanalCommmunicationClient
public Object callManager(final Event event) {
CommunicationException ex = null;
Object object = null;
for (int i = index; i < index + managerAddress.size(); i++) { // 循环一次manager的所有地址
String address = managerAddress.get(i % managerAddress.size());
try {
object = delegate.call(address, event);
index = i; // 更新一下上一次成功的地址
return object;
} catch (CommunicationException e) {
// retry next address;
ex = e;
}
}
throw ex; // 走到这一步,说明肯定有出错了
}
DefaultCommunicationClientImpl才会创建连接,远程通信
//DefaultCommunicationClientImpl
public Object call(final String addr, final Event event) {
Assert.notNull(this.factory, "No factory specified");
CommunicationParam params = buildParams(addr);
CommunicationConnection connection = null;
int count = 0;
Throwable ex = null;
while (count++ < retry) {
try {
connection = factory.createConnection(params);
return connection.call(event);
} catch (Exception e) {
logger.error(String.format("call[%s] , retry[%s]", addr, count), e);
try {
Thread.sleep(count * retryDelay);
} catch (InterruptedException e1) {
// ignore
}
ex = e;
} finally {
if (connection != null) {
connection.close();
}
}
}
logger.error("call[{}] failed , event[{}]!", addr, event.toString());
throw new CommunicationException("call[" + addr + "] , Event[" + event.toString() + "]", ex);
}
DubboCommunicationConnection包装了endpoint
//DubboCommunicationConnection
public Object call(Event event) {
// 调用rmi传递数据到目标server上
return endpoint.acceptEvent(event);
}
AbstractCommunicationEndpoint中查找服务与对应的事件,通过反射调用服务
//AbstractCommunicationEndpoint
public Object acceptEvent(Event event) {
if (event instanceof HeartEvent) {
return event; // 针对心跳请求,返回一个随意结果
}
try {
Object action = CommunicationRegistry.getAction(event.getType());
if (action != null) {
// 通过反射获取方法并执行
String methodName = "on" + StringUtils.capitalize(event.getType().toString());
Method method = ReflectionUtils.findMethod(action.getClass(), methodName,
new Class[] { event.getClass() });
if (method == null) {
methodName = DEFAULT_METHOD; // 尝试一下默认方法
method = ReflectionUtils.findMethod(action.getClass(), methodName, new Class[] { event.getClass() });
if (method == null) { // 再尝试一下Event参数
method = ReflectionUtils.findMethod(action.getClass(), methodName, new Class[] { Event.class });
}
}
// 方法不为空就调用指定的方法,反之调用缺省的处理函数
if (method != null) {
try {
ReflectionUtils.makeAccessible(method);
return method.invoke(action, new Object[] { event });
} catch (Throwable e) {
throw new CommunicationException("method_invoke_error:" + methodName, e);
}
} else {
throw new CommunicationException("no_method_error for["
+ StringUtils.capitalize(event.getType().toString())
+ "] in Class[" + action.getClass().getName() + "]");
}
}
throw new CommunicationException("eventType_no_action", event.getType().name());
} catch (RuntimeException e) {
logger.error("endpoint_error", e);
throw e;
} catch (Exception e) {
logger.error("endpoint_error", e);
throw new CommunicationException(e);
}
}
}
实现的remote服务实现,命名一般为xxxRemoteServiceImpl,内部是通常的javabean服务 如:CanalRemoteServiceImpl
public Canal onFindCanal(FindCanalEvent event) {
String destination = event.getDestination();
return canalService.findByName(destination);
}